package org.databandtech.flink.sink;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;

import org.elasticsearch.client.Requests;

public class ESSinkFunction implements ElasticsearchSinkFunction<Object>{
	
	private static final long serialVersionUID = 5852594084789732123L;
	String INDEX ="";
	
	public ESSinkFunction(String iNDEX) {
		super();
		INDEX = iNDEX;
	}

	@Override
	public void process(Object element, RuntimeContext ctx, RequestIndexer indexer) {
		indexer.add(Requests.indexRequest()
                .index(INDEX)  //es 索引名
                .source(element)); 
		
	}

}
